package com.atguigu.flinkState;

import com.atguigu.been.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * 	监控水位传感器的水位值，如果水位值在五秒钟之内(processing time)连续上升，
 * 	则报警，并将报警信息输出到侧输出流。
 * @author wky
 * @create 2021-07-18-18:03
 */
public class Timer_State_Test {
    public static void main(String[] args) throws Exception {
        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.读取端口数据并转换为JavaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDS = env.socketTextStream("hadoop102", 9999).map(new MapFunction<String, WaterSensor>() {
            @Override
            public WaterSensor map(String value) throws Exception {
                String[] words = value.split(",");
                return new WaterSensor(words[0], Long.parseLong(words[1]), Integer.parseInt(words[2]));
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<WaterSensor>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                            @Override
                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                return element.getTs()*1000L;
                            }
                        })
        );

        //3.按照传感器ID分组
        KeyedStream<WaterSensor, String> keyedStream = waterSensorDS.keyBy(date -> date.getId());

        //4.使用ProcessFunction实现5秒种水位不下降，则报警，且将报警信息输出到侧输出流
        SingleOutputStreamOperator<WaterSensor> result = keyedStream.process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {

            //用来记录上一次水位高度
//            private Integer lastVc = Integer.MIN_VALUE;
            private ValueState<Integer> lastVc;

            //用来记录定时器时间
//            private Long timerTs = Long.MIN_VALUE;

            private ValueState<Long> timerTs;

            @Override
            public void open(Configuration parameters) throws Exception {
                lastVc = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVc",Integer.class,Integer.MIN_VALUE));
                timerTs = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer",Long.class,Long.MIN_VALUE));
            }

            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {


                //判断当前水位线是否高于上次水位线
                if (value.getVc() > lastVc.value()) {
                    //判断定时器是否重置，是否为第一条数据
                    if (timerTs.value() == Long.MIN_VALUE) {
                        System.out.println("注册定时器。。。");
                        //注册5秒钟之后的定时器
                        System.out.println(ctx.timestamp());

                        timerTs.update(ctx.timestamp() + 5000L);
                        ctx.timerService().registerEventTimeTimer(timerTs.value());
                    }
                } else {
                    //如果水位线没有上升则删除定时器
                    ctx.timerService().deleteEventTimeTimer(timerTs.value());
                    System.out.println("删除定时器。。。");
                    //将定时器的时间重置
                    //重新注册5秒钟之后的定时器
                    System.out.println("重新注册定时器。。。"+(ctx.timestamp()+5000L));
                    ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000L);
                    //将定时器的时间重置
                    timerTs.clear();
//                    timerTs.update(Long.MIN_VALUE);
                }
                //最后更新最新的水位线
                lastVc.update(value.getVc());
                System.out.println(lastVc.value());
                out.collect(value);
            }

            //
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<WaterSensor> out) throws Exception {
                timerTs.clear();
                ctx.output(new OutputTag<String>("sideOut") {
                }, ctx.getCurrentKey() + "报警！！！！！！");
                //重置定时器时间

            }
        });

        result.print("主流");
        result.getSideOutput(new OutputTag<String>("sideOut") {
        }).print("报警信息");


        env.execute();

    }
}
